# Get ZTRAX assessor for pre-2016 fires                                                                           

# Setup ----

source("code/globals.R")

pacman::p_load(data.table, readxl, tidyverse, parallel, fst)

## These lines set several options
options(scipen = 999) # Do not print scientific notation
options(stringsAsFactors = FALSE) ## Do not load strings as factors

ztraxHistLoc <- file.path(ZTRAX, "HistoricalAssessment")

# Prepare for data read using file layouts -----

## Read in the file layouts to get column names for each table
layoutZAsmt <- read_excel(file.path(ztraxHistLoc, "LayoutAsmtHistory.xlsx"), sheet = 1)
## Get column data types
table(layoutZAsmt$DateType, useNA = "always")
layoutZAsmt$R_coltype <- NA
layoutZAsmt$R_coltype[layoutZAsmt$DateType %in%
                        c("bigint", "decimal", "int", "money", "smallint", "tinyint")] <- "numeric" #' numeric' for fread
layoutZAsmt$R_coltype[layoutZAsmt$DateType %in%
                        c("char", "date", "uniqueidentifier", "varchar")] <- "character" #' character' for fread


# Create a list of desired variables for the main table
layout_main <- layoutZAsmt %>% 
  filter(TableName == "utMain") %>%
  mutate(select = ifelse(FieldName %in% c("RowID", "ImportParcelID", "FIPS", "Edition", "ExtractDate", "ValueCertDate",
                                          "UnformattedAssessorParcelNumber", "PropertyFullStreetAddress", "PropertyZip",
                                          "PropertyStreetName", "PropertyHouseNumber", "PropertyStreetSuffix",
                                          "PropertyAddressLatitude", "PropertyAddressLongitude", "PropertyStreetPreDirectional", "PropertyStreetPostDirectional",
                                          "LotSizeAcres", "TaxAmount", "TaxYear", "BatchID", "SourceChkSum"), TRUE, FALSE))


# Create a list of desired variables for the building table
layout_building <- layoutZAsmt %>% filter(TableName == "utBuilding") %>%
  mutate(select = ifelse(FieldName %in% c("RowID", "NoOfUnits", "BuildingOrImprovementNumber", "YearBuilt", "EffectiveYearBuilt", "YearRemodeled",
                                          "TotalBedrooms", "TotalCalculatedBathCount", "RoofCoverStndCode", "RoofStructureTypeStndCode",
                                          "PropertyLandUseStndCode", "BuildingClassStndCode", "BuildingQualityStndCode", "BuildingQualityStndCodeOriginal"), TRUE, FALSE))

# Create a list of desired variables for the building areas table
layout_building_areas <- layoutZAsmt %>% filter(TableName == "utBuildingAreas") %>%
  mutate(select = ifelse(FieldName %in% c("RowID", "BuildingOrImprovementNumber", "BuildingAreaStndCode", "BuildingAreaSqFt"), TRUE, FALSE))

# Create a list of desired variables for the value table
layout_value <- layoutZAsmt %>% filter(TableName == "utValue") %>%
  mutate(select = ifelse(FieldName %in% c("RowID", "TotalAssessedValue", "ImprovementAssessedValue", "AssessmentYear",
                                          "TotalMarketValue", "ImprovementMarketValue", "MarketValueYear",
                                          "ImprovementAppraisalValue", "AppraisalValueYear"), TRUE, FALSE))


# Construct a list of counties we want to pull assessment data from ----
cacounties <- readRDS(file.path(WORKING, "counties_in_dins_data.RDS")) %>% # restricted: counties_before_sept_2016.Rds"
  as.character()

ca_oldfire_counties <- readRDS(file.path(WORKING, "counties_in_older_CA_fires.Rds")) %>%
  as.character()

othercounties <- readRDS(file.path(WORKING, "countylist_other_states.RDS")) %>%
  as.character()

counties <- c(cacounties, ca_oldfire_counties, othercounties) %>%
  unique()

##### counties<-c('41001') # For testing

# Format FIPS properly # TODO: Does this screw anything up?
counties <- sprintf("%05d", as.numeric(counties))

# Make a list of required "as of" dates for each state ------
## Will create tables that have the most recent assessment prior to each of these dates

california_dates <- c(
  "2003-01-01",
  "2007-01-01",
  "2008-01-01",
  "2009-01-01",
  "2011-01-01",
  "2012-01-01",
  "2013-01-01",
  "2015-01-01"
)

arizona_dates <- c("2013-01-01")

colorado_dates <- c(
  "2012-01-01",
  "2013-01-01"
)

oregon_dates <- character() # Oregon fires are all 2020; get from new ZTRAX

washington_dates <- c(
  "2014-01-01",
  "2015-01-01"
)

required_snapshots <- data.frame(
  state = c(
    rep("06", length(california_dates)),
    rep("04", length(arizona_dates)),
    rep("08", length(colorado_dates)),
    rep("41", length(oregon_dates)),
    rep("53", length(washington_dates))
  ),
  day = c(
    california_dates,
    arizona_dates,
    colorado_dates,
    oregon_dates,
    washington_dates
  )
) %>%
  mutate(day = as.Date(day))

## Crosswalk of state names and FIPS codes
fips_and_abbr <- data.frame(
  fips = c("04", "06", "08", "35", "41", "53"),
  stateabbr = c("AZ", "CA", "CO", "NM", "OR", "WA")
)


# DEFINE READ FUNCTIONS -----

## Read main table ----
f_readmain <- function(state, threads) {
  # state <- "06"
  # threads <- 16
  # END DEBUG
  
  # Use data from layout table to A) create a vector for the fread select parameter and B) create a vector for the col.names parameter
  cols_to_select <- layout_main %>% filter(select == TRUE) %>% pull(R_coltype)
  names(cols_to_select) <- paste0("V", layout_main %>% filter(select == TRUE) %>% pull(column_id))
  
  names_to_load <- layout_main %>% filter(select == TRUE) %>% pull(FieldName)
  
  base <- fread(file.path(ztraxHistLoc, state, "ZAsmt/Main.txt"),
                sep = "|",
                header = FALSE,
                stringsAsFactors = FALSE,
                quote = "",
                select = cols_to_select,
                col.names = names_to_load,
                nThread = threads,
                nrows = NROWS_GLOBAL
  )
  
  # Limit to desired counties
  base <- base[FIPS %in% counties]
  
  # Clear memory
  gc()
  
  # write_fst(base,file.path(ztraxHistLoc,paste0(state,outsuffix,'.fst')))
  return(base)
}

## Read Building Table -----
f_readbldg <- function(state, threads) {
  
  # Use data from layout table to A) create a vector for the fread select parameter and B) create a vector for the col.names parameter
  cols_to_select <- layout_building %>% filter(select == TRUE) %>% pull(R_coltype)
  names(cols_to_select) <- paste0("V", layout_building %>% filter(select == TRUE) %>% pull(column_id))
  
  names_to_load <- layout_building %>% filter(select == TRUE) %>% pull(FieldName)
  
  bldg <- fread(file.path(ztraxHistLoc, state, "ZAsmt/Building.txt"),
                sep = "|",
                header = FALSE,
                stringsAsFactors = FALSE,
                quote = "",
                select = cols_to_select,
                col.names = names_to_load,
                nThread = threads,
                nrows = NROWS_GLOBAL
  )
  
  # Limit to desired property types
  bldg <- bldg[PropertyLandUseStndCode %in% c(
    "RR000", # RESIDENTIAL GENERAL
    "RR101", # SINGLE FAMILY RESIDENTIAL
    "RR102", # RURAL RESIDENCE
    "RR103", # MOBILE HOME
    "RR104", # TOWNHOUSE
    "RR105", # CLUSTER HOME
    "RR106", # CONDOMINIUM
    "RR107", # COOPERATIVE
    "RR108", # ROW HOUSE
    "RR109", # PLANNED UNIT DEVELOPMENT
    "RR110", # RESIDENTIAL COMMON AREA
    "RR111", # TIMESHARE
    "RR112", # SEASONAL, CABIN, VACATION RESIDENCE
    "RR113", # BUNGALOW
    "RR114", # ZERO LOT LINE
    "RR115", # MANUFACTURED, MODULAR, PREFABRICATED HOMES
    "RR116", # PATIO HOME
    "RR117", # RESIDENTIAL PARKING GARAGE
    "RR118", # MISCELLANEOUS IMPROVEMENT
    "RR119", # GARDEN HOME
    "RR120", # LANDOMINIUM
    "RR999", # INFERRED SINGLE FAMILY RESIDENTIAL
    "RI000", # RESIDENTIAL INCOME GENERAL (MULTI FAMILY)
    "RI101", # DUPLEX (2 UNITS, ANY COMBINATION)
    "RI102", # TRIPLEX (3 UNITS, ANY COMBINATION)
    "RI103", # QUADRUPLEX (4 UNITS, ANY COMBINATION)
    "RI104", # APARTMENT BUILDING (5+ UNITS)
    "RI105", # APARTMENT BUILDING (100+ UNITS)
    "RI106", # GARDEN APARTMENT, COURT APARTMENT (5+ UNITS)
    "RI107", # HIGH-RISE APARTMENT
    "RI108", # BOARDING HOUSE ROOMING HOUSE APT HOTEL TRANSIENT LODGING
    "RI109", # MOBILE HOME PARK, TRAILER PARK
    "RI110", # MULTIFAMILY DWELLING (GENERIC ANY COMBINATION 2+)
    "RI111", # FRATERNITY HOUSE, SORORITY HOUSE
    "RI112", # APARTMENT (GENERIC)
    "RI113", # DORMITORY, GROUP QUARTERS (RESIDENTIAL)
    "RI114"
  ) # RESIDENTIAL CONDOMINIUM DEVELOPMENT (ASSOCIATION ASSESSMENT)
  ]
  
  
  # write_fst(base,file.path(ztraxHistLoc,paste0(state,outsuffix,'.fst')))
  return(bldg)
}

## Read value table ----
f_readval <- function(state, threads) {
  # Use data from layout table to A) create a vector for the fread select parameter and B) create a vector for the col.names parameter
  cols_to_select <- layout_value %>% filter(select == TRUE) %>% pull(R_coltype)
  names(cols_to_select) <- paste0("V", layout_value %>% filter(select == TRUE) %>% pull(column_id))
  
  names_to_load <- layout_value %>% filter(select == TRUE) %>% pull(FieldName)
  
  # Load data
  val <- fread(file.path(ztraxHistLoc, state, "ZAsmt/Value.txt"),
               sep = "|",
               header = FALSE,
               stringsAsFactors = FALSE,
               quote = "",
               select = cols_to_select, 
               col.names = names_to_load,
               nThread = threads,
               nrows = NROWS_GLOBAL
  )
  
  
  # write_fst(base,file.path(ztraxHistLoc,paste0(state,outsuffix,'.fst')))
  return(val)
}

## Read building areas table ----
f_readsqft <- function(state, threads) {
  
  # Use data from layout table to A) create a vector for the fread select parameter and B) create a vector for the col.names parameter
  cols_to_select <- layout_building_areas %>% filter(select == TRUE) %>% pull(R_coltype)
  names(cols_to_select) <- paste0("V", layout_building_areas %>% filter(select == TRUE) %>% pull(column_id))
  
  names_to_load <- layout_building_areas %>% filter(select == TRUE) %>% pull(FieldName)
  
  sqft <- fread(file.path(ztraxHistLoc, state, "ZAsmt/BuildingAreas.txt"),
                sep = "|",
                header = FALSE,
                stringsAsFactors = FALSE,
                quote = "",
                select = cols_to_select,
                col.names = names_to_load,
                nThread = threads,
                nrows = NROWS_GLOBAL
  )
  
  
  
  sqft <- sqft[
    BuildingAreaStndCode %in% c(
      "BAL", # Building Area Living
      "BAF", # Building Area Finished
      "BAE", # Effective Building Area
      "BAG", # Gross Building Area
      "BAJ", # Building Area Adjusted
      "BAT", # Building Area Total
      "BLF"
    ), # Building Area Finished Living
  ]
  
  # Svae the largest area as our measure of square footage
  sqft <- sqft[, list(sqfeet = max(BuildingAreaSqFt, na.rm = T)), by = c("RowID", "BuildingOrImprovementNumber")]
  
  
  return(sqft)
}



#############################################################################################################
####### Import all 4 tables for a given state, merge them together, and save a full dataset for the state ###
#############################################################################################################

read_one_state <- function(state) {
  # state <- "06"
  # END DEBUG
  # thr <- 16
  thr <- 1 # DEBUG
  
  print(paste0("starting Main table using fread with ", thr, " threads"))
  ptm <- proc.time()
  base <- f_readmain(state, threads = thr)
  print(proc.time() - ptm)
  
  # Drop any duplicated rows by RowID, these are clear data entry duplications.
  base <- unique(base, by = "RowID")
  gc()
  
  setkey(base, RowID)
  
  # Load and merge in building table
  # Note: We do these loads as needed to limit memory usage
  print(paste0("starting Building table using fread with ", thr, " threads"))
  ptm <- proc.time()
  bldg <- f_readbldg(state, threads = thr)
  print(proc.time() - ptm)
  
  print("merging building")
  setkey(bldg, RowID)
  attr <- merge(base, bldg, by = "RowID") # Note this only keeps rows that appear in both datasets (i.e., all.x=FALSE)
  stopifnot(anyDuplicated(attr, by = c("RowID", "BuildingOrImprovementNumber")) == 0)
  rm(base, bldg) # these are huge; get them out of memory
  gc()
  
  setkey(attr, RowID)
  
  
  ## Load and merge value table
  print(paste0("starting Value table using fread with ", thr, " threads"))
  ptm <- proc.time()
  val <- f_readval(state, threads = thr)
  print(proc.time() - ptm)
  
  print("merging value")
  setkey(val, RowID)
  attr <- merge(attr, val, by = "RowID", all.x = TRUE) # Here, I keep all rows in the left file because not every property has value info
  stopifnot(anyDuplicated(attr, by = c("RowID", "BuildingOrImprovementNumber")) == 0)
  rm(val)
  gc()
  
  # Load an merge square footage table
  print(paste0("starting SqFt table using fread with ", thr, " threads"))
  ptm <- proc.time()
  sqft <- f_readsqft(state, threads = thr)
  print(proc.time() - ptm)
  
  print("setting keys to merge square footage")
  setkey(attr, RowID, BuildingOrImprovementNumber)
  setkey(sqft, RowID, BuildingOrImprovementNumber)
  print("merging square footage -- this seems to cause memory errors..")
  attr <- merge(attr, sqft, by = c("RowID", "BuildingOrImprovementNumber"), all.x = TRUE) ## Note the two merge fields and the all.x=TRUE
  stopifnot(anyDuplicated(attr, by = c("RowID", "BuildingOrImprovementNumber")) == 0)
  rm(sqft)
  gc()
  
  # Save full state historical assessment data (for counties in import list)
  write_fst(attr, file.path(WORKING, "ztraxdata", paste0("full", state, ".fst")))
  
  return(attr)
}

########################################################################################
##### Make 'as of' databases with most recent assessment prior to various dates ########
########################################################################################

make_as_of <- function(as_of_date, statefips, fullstatedata) {
  # as_of_date <- "2013-01-01"
  # statefips <- "06"
  # fullstatedata <- read_fst(file.path(WORKING, "ztraxdata", paste0("full", statefips, ".fst")), as.data.table = T)
  # END DEBUG

  print(sprintf("Starting %s as of %s", statefips, as_of_date))
  
  d <- fullstatedata
  
  ###### For each ImportParcelID, pick the 1 most recent record prior to cutoff date
  d <- d %>%
    #- Process extract date information into sortable time field
    mutate(
      ExtractYear = as.numeric(substr(ExtractDate, 3, 6)),
      ExtractMonth = as.numeric(substr(ExtractDate, 1, 2)),
      date = paste(ExtractYear, ExtractMonth, "01", sep = "-"),
      date = as.Date(date)
    ) %>%
    #- Drop records more recent than 'as of' date
    dplyr::filter(date < as_of_date) %>%
    #- Keep only most recent from remaining
    group_by(ImportParcelID) %>%
    arrange(ImportParcelID, date) %>%
    dplyr::filter(row_number() == n())
  
  ###### Confirm dates and IDs are valid/not missing
  stopifnot(sum(is.na(d$ExtractMonth)) == 0 & sum(is.na(d$ExtractYear)) == 0 & sum(is.na(d$date)) == 0)
  ###### Confirm ImportParcelID uniquely identifies records
  stopifnot(anyDuplicated(d$ImportParcelID) == 0)
  
  ##### Issue to handle: Duplicated Assessor Parcel Numbers (rare).
  # For a small number of properties, there appear to
  # have been changes in ImportParcelID over time, so that the same APN appears twice with two different ImportParcelIDs.
  # VISUAL INSPECTION SHOWS THESE ARE ALMOST ALL PERFECT DUPLICATES, SO I KEEP MOST RECENT EXTRACT DATE WHEN THIS OCCURS.
  #- Count duplicates
  s <- sum(duplicated(d[, c("FIPS", "UnformattedAssessorParcelNumber")]) & !is.na(d$UnformattedAssessorParcelNumber))
  if (s > nrow(d) * 0.005) {
    print(paste0(s, " duplicated out of ", nrow(d))) # warn but don't stop if there are lots of dupes
  }
  
  # - Remove the multiple ImportParcelIDs within APN
  d <- d %>%
    group_by(FIPS, UnformattedAssessorParcelNumber) %>%
    arrange(FIPS, UnformattedAssessorParcelNumber, date) %>% # sorts by extract date within groups)
    dplyr::filter(row_number() == n() |
                    is.na(UnformattedAssessorParcelNumber)) ## We keep properties with NA APN here, can still be included as non-destroyed
  
  ## Confirm that FIPS and UAPN now uniquely identify properties (except for properties with missing UAPN)
  stopifnot(anyDuplicated(d[!is.na(d$UnformattedAssessorParcelNumber), c("FIPS", "UnformattedAssessorParcelNumber")]) == 0)
  
  ### FIPS to numeric for consistency downstream
  d <- d %>%
    mutate(FIPS = as.numeric(FIPS))
  
  ### Save the cleaned file
  outname <- paste0(
    fips_and_abbr$stateabbr[fips_and_abbr$fips == statefips],
    "attr",
    year(as_of_date),
    ".fst"
  )
  write_fst(d, file.path(WORKING, "ztraxdata", outname))
  
  # Before returning, try to clean up memory
  rm(d)
  gc()
  
  return(NULL)
}



### For efficiency, load each state's full data, apply this function over all required dates
import <- function(this_state) {
  fullstate <- read_one_state(this_state)
  
  # DEBUG: Load full state data from previously saved memory
  # fullstate <- read_fst(file.path(WORKING, "ztraxdata", paste0("full", this_state, ".fst")), as.data.table = T)
  
  mclapply(required_snapshots %>% filter(state == this_state) %>% pull(day),
           make_as_of,
           statefips = this_state,
           fullstatedata = fullstate
  )
}

# Run import functions for all states -----

# start with CA
import("06")
import("04")
import("08")
import("41")
import("53")
